package gu.simplemq.redis;

import java.lang.reflect.Array;
import java.util.Collection;
import java.util.List;

import com.google.common.base.Function;
import com.google.common.reflect.TypeToken;

import gu.simplemq.Channel;
import gu.simplemq.IAdvisor;
import gu.simplemq.IDequeProducer;
import gu.simplemq.IProducer;
import gu.simplemq.json.BaseJsonEncoder;
import gu.simplemq.utils.CommonUtils;
import redis.clients.jedis.Jedis;

/**
 * {@link IProducer} redis实现
 * @author guyadong
 *
 */
public class RedisProducer implements IRedisComponent, IDequeProducer,RedisConstants{
    /** 是否向队列末尾添加 */
	protected boolean offerLast = true;
	private static final BaseJsonEncoder encoder = BaseJsonEncoder.getEncoder();
	/**
	 * 将对象序列化为JSON字符串的{@link Function} 实例
	 */
	private final static Function<Object, String> jsonFun = new Function<Object,String>(){
		@Override
		public String apply(Object input) {
			return encoder.toJsonString(input);
		}};
	private final JedisPoolLazy poolLazy;
	private final RedisConsumerAdvisor advisor;
	/**
	 * 将对象序列化为字符串的实例默序列化为JSON字符串
	 */
	private Function<Object, String> stringSerializer = jsonFun;
	@Override
	public JedisPoolLazy getPoolLazy() {
		return poolLazy;
	}
	
	public RedisProducer(JedisPoolLazy poolLazy) {
		super();
		this.poolLazy = poolLazy;
		this.advisor = RedisConsumerAdvisor.queueAdvisorOf(poolLazy);
	}
	
	@Override
	public <T> void produce(Channel<T> channel, T object, boolean offerLast) {
		if(null == object){
			return;
		}
		Jedis jedis = this.poolLazy.apply();
		try{
			if(offerLast){
				jedis.rpush(channel.name, this.stringSerializer.apply(object));
			}else{
				jedis.lpush(channel.name, this.stringSerializer.apply(object));
			}
		}finally{
			this.poolLazy.free();
		}		
	}
	
	@Override
	public <T> void produce(Channel<T> channel, T object) {
		produce(channel,object,this.offerLast);	
	}

	@Override
	public <T> void produce(Channel<T> channel, boolean offerLast, @SuppressWarnings("unchecked") T... objects) {
		List<T> list = CommonUtils.cleanNullAsList(objects);
		if(list.isEmpty()){
			return;
		}
		if(null != channel.type){
			// 检查发布的对象类型与频道数据类型是否匹配
			if(channel.type instanceof Class<?> && 
					!((Class<?>)channel.type).isAssignableFrom(objects.getClass().getComponentType())){
				throw new IllegalArgumentException("invalid component type of 'objects'");
			}
		}
		String[] strings = new String[list.size()];
		for(int i=0;i<strings.length;++i){
			strings[i] = this.stringSerializer.apply(list.get(i));
		}
		Jedis jedis = this.poolLazy.apply();
		try{
			if(offerLast){
				jedis.rpush(channel.name, strings);
			}else{
				jedis.lpush(channel.name, strings);
			}
		}finally{
			this.poolLazy.free();
		}
	}
	
	@Override
	public <T> void produce(Channel<T> channel, @SuppressWarnings("unchecked") T... objects) {
		produce(channel,this.offerLast,objects);
	}
	
	@Override
	@SuppressWarnings("unchecked")
	public <T> void produce(Channel<T> channel, boolean offerLast, Collection<T>c) {
		if(null != c ) {
			produce(channel,offerLast, c.toArray((T[]) Array.newInstance(TypeToken.of(channel.type).getRawType(), 0)));
		}
	}
	@Override
	public <T> void produce(Channel<T> channel, Collection<T>c) {
		produce(channel,this.offerLast,c);
	}
	@Override
	public void setOfferLast(boolean offerLast) {
		this.offerLast = offerLast;
	}


	@Override
	public IAdvisor getAdvisor() {
		return advisor;
	}
	@Override
	public IProducer withStringSerializer(Function<Object, String> stringSerializer) {
		this.stringSerializer = null == stringSerializer ? jsonFun: stringSerializer;
		return this;
	}
}
